Skip to content

Conversation

@yanjunxiang-google
Copy link
Contributor

@yanjunxiang-google yanjunxiang-google commented Oct 8, 2025

This is to address a portion of the issue: #37088, i.e, close the gRPC stream once no further external processing needed.

Currently the ext_proc gRPC stream is opened when the 1st ProcessingRequest is sent to the ext_proc server. And it is closed during ext_proc filter destruction. This is wasting resource on both Envoy and ext_proc server side. For example, if envoy is configured to only send request headers, the gRPC stream is left open until all the way to the response is processed.

This PR is trying to close the ext_proc gRPC stream once Envoy detects no more external processing needed.

Signed-off-by: Yanjun Xiang <[email protected]>
@yanjunxiang-google
Copy link
Contributor Author

/retest

@yanjunxiang-google
Copy link
Contributor Author

/assign @yanavlasov @tyxia @stevenzzzz

Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Copy link
Contributor

@yanavlasov yanavlasov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/wait


} // namespace

bool Filter::noExternalProcessInEncoding() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add this method to the EncodingProcessorState ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Close the gRPC stream if the last ProcessingResponse is received.
void Filter::closeGrpcStreamIfLastRespReceived(
const std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>& response) {
bool last_response = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that should part of the ProcessorState. After processing a response message it should return a flag indicating that a terminal state has been reached and gRPC stream can be closed.

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, the idea is to decide whether to terminate the gRPC stream once the ext_proc filter receives the ProcessingResponse, i.e, at the center place inside Filter::onReceiveMessage(), as Envoy has enough information here to decide whether to terminate it then. Basically, this is determined by 1)whether the end-of-stream is received, 2) and whether Envoy needs to send more data to the ext_proc server based on filter configuration. Hooking the logic to ProcessorState will unavoidably spread the logic to each of the handling header response, handling body response(different modes), and handling trailer response, which will make it error prone and hard to maintain.

Signed-off-by: Yanjun Xiang <[email protected]>
Copy link
Contributor

@stevenzzzz stevenzzzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hooah. You are fighting a beast here. :)

high level, since this is a behavior change that impacts prod traffic, could you guard this change with a feature flag?

Copy link
Contributor

@stevenzzzz stevenzzzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments, pls consider some common cases like(there for sure are more):

  1. CONTINUE_AND_REPLACE finishes one direction's all events.
  2. EOS from trailers implicitly terminates body.
  3. only check if-last-reponse after response is "processed", on error cases, stream will already be closed.

On a high level, if we have gone this far already, let's also consider to wait-for-trailers after the half close been sent. but that could be in another PR I assume.

const envoy::service::ext_proc::v3::ProcessingResponse& response) {
switch (state.bodyMode()) {
case ProcessingMode::BUFFERED:
case ProcessingMode::BUFFERED_PARTIAL:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much appreciate if you could add a method comment talking about what this method is about, and comment heavily on which modes are not supported yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, could you pls also update in the change log?

break;
case ProcessingMode::STREAMED:
if (!state.chunkQueue().empty()) {
return state.chunkQueue().queue().front()->end_stream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider the corner case that body is done, because we received trailers already here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other corner cases:
If CONTINUE_AND_REPLACE is set, it also means we can safely half-close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the stream optimization logic to the end of the onReceiveMessage, i.e, after the ProcessingResponse is already processed. This will automatically count in CONTINUE_AN_REPLACE, as such message will modify the filter processing modes configuration, thus be counted in the check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider the corner case that body is done, because we received trailers already here?

This is a little bit tricky. Can we skip this corner case? If we missed closing the stream in the decoding path, the stream closing added at the encodeHeaders() will catch it and cleanup any way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the end goal is to close/finish the whole side stream before destructing the filter (before mainstream tears down).
It's okay to skip, as we can fall back to the usual closure for now, but pls comment here which cases are not considered yet.

case ProcessingMode::FULL_DUPLEX_STREAMED: {
const envoy::service::ext_proc::v3::BodyResponse* body_response = nullptr;
if (response.has_request_body()) {
body_response = &response.request_body();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, there should not be a if-else here.
instead, use the state's type to get the real body response field?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the corner cases above apply here as well: trailers received, CONTINUE_AND_REPLACE set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, there should not be a if-else here. instead, use the state's type to get the real body response field?

done

void closeStreamMaybeGraceful();

// Close the gRPC stream if the last ProcessingResponse is received.
void closeGrpcStreamIfLastRespReceived(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, you could use "const ProcessingResponse&" here to save some dereferences.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

bool last_response = false;

switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the ext_proc sends invalid response, e.g. requestHeaders on Body events, it's a bad state that would cause stream close already.
Shall we evaluate all the cases after the response is processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the stream closing logic to the end of onReceiveMessage() automatically count in this as well as stream_ become nullptr in these conditions.

switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
if ((decoding_state_.hasNoBody() ||
(decoding_state_.bodyMode() == ProcessingMode::NONE && !decoding_state_.sendTrailers())) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other case: body mode confgiured, but trailers received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trailer mode set, but EoS seen in headers or Body already

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the cases you mentioned in the TBD list in the PR description, please take a look. The goal of this PR is to get the normal case optimization working, then we can deal with the corner cases in the future enhancement. BTW, if some corner cases are missed in the decoding path, the stream closing logic added in encodeHeaders() will catch them all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the TODO comments in code. those are not "corner cases" IMHO.
Since you started this, I hope you could finish it :)

}
break;
case ProcessingResponse::ResponseCase::kRequestBody:
if (isLastBodyResponse(decoding_state_, *response) && encoding_state_.noExternalProcess()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if trailers are configured?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we detect EoS is received with body, so no trailers in this request. Thus even filter is configured to send trailers, the external processing in this direction is completed.

Signed-off-by: Yanjun Xiang <[email protected]>
@botengyao
Copy link
Member

Waiting for addressing comments.

/wait

Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
@repokitteh-read-only
Copy link

CC @envoyproxy/runtime-guard-changes: FYI only for changes made to (source/common/runtime/runtime_features.cc).

🐱

Caused by: #41425 was synchronize by yanjunxiang-google.

see: more, trace.

@yanjunxiang-google
Copy link
Contributor Author

left some comments, pls consider some common cases like(there for sure are more):

  1. CONTINUE_AND_REPLACE finishes one direction's all events.
  2. EOS from trailers implicitly terminates body.
  3. only check if-last-reponse after response is "processed", on error cases, stream will already be closed.

On a high level, if we have gone this far already, let's also consider to wait-for-trailers after the half close been sent. but that could be in another PR I assume.

  1. is considered by moving the stream closing check at the end of the onReceiveMessage().
  2. meaning: if trailers is received, if the body response is received, and if no more body chunks in the state chunk queue, that's the last body response. On top of that, if the trailer processing mode is SKIP, then external processing in the current direction is done. I am leaving this case optimization to the future enhancement. Note, we also have a catch all optimization added in encodeHeaders() which will help this situation a little bit.
  3. is considered by moving the stream closing check at the end of the onReceiveMessage(), as in this case, stream_ is a nullptr.

Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
@yanjunxiang-google
Copy link
Contributor Author

Hooah. You are fighting a beast here. :)

high level, since this is a behavior change that impacts prod traffic, could you guard this change with a feature flag?

Thanks for the comments! done

Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Yanjun Xiang <[email protected]>
Copy link
Member

@tyxia tyxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this!

+1 on runtime guard to protect this change,. The processing mode/ext_proc is getting more and more complicated )

Signed-off-by: Yanjun Xiang <[email protected]>
@yanjunxiang-google
Copy link
Contributor Author

yanjunxiang-google commented Oct 17, 2025

The initial optimization does not apply for a few scenarios listed below, which is TBD in the future:

  1. For BUFFERED and BUFFERED_PARTIAL body mode
  2. During header response processing, if request does not have body but has trailer, and trailer is received, at same time if body sending mode is not NONE, and trailer sending mode is SKIP(unlikely request pattern, corner case)
  3. During header response processing, if request has body and end_of_stream is received with body, at same time if body sending mode is NONE, and trailer sending mode is SEND(unlikely processing mode configuration, corner case).

@yanjunxiang-google
Copy link
Contributor Author

Kind Ping!

@yanjunxiang-google
Copy link
Contributor Author

/retest

Copy link
Contributor

@stevenzzzz stevenzzzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this!

case ProcessingMode::BUFFERED:
case ProcessingMode::BUFFERED_PARTIAL:
// TODO: - skip stream closing optimization for BUFFERED and BUFFERED_PARTIAL for now.
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just return

const envoy::service::ext_proc::v3::ProcessingResponse& response) {
switch (state.bodyMode()) {
case ProcessingMode::BUFFERED:
case ProcessingMode::BUFFERED_PARTIAL:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, could you pls also update in the change log?

break;
case ProcessingMode::STREAMED:
if (!state.chunkQueue().empty()) {
return state.chunkQueue().queue().front()->end_stream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the end goal is to close/finish the whole side stream before destructing the filter (before mainstream tears down).
It's okay to skip, as we can fall back to the usual closure for now, but pls comment here which cases are not considered yet.

}
break;
case ProcessingMode::FULL_DUPLEX_STREAMED: {
if (body_response.has_response() && body_response.response().has_body_mutation()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it's my bad taste, but we can ignore all these has_XXX IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not check these has_xxx, and if they are not set, the code will rely on the default behavior of these messages, which is error-prone.

} // namespace

void Filter::closeGrpcStreamIfLastRespReceived(const ProcessingResponse& response,
const bool is_last_body_resp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully follow why is_last_body_resp is an input parameter, could it be inferred using the response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question!

For FULL_DUPLEX_STREAMED mode, it can be inferred from the response.

For STREAMED mode, the response does not contain EOS. In this case, the EOS info is stored in the Envoy chunk_queue element. However, as we are doing the stream_closing after the response message already processed, i.e, the chunk_queue element is already popped by then. So, for STREAMED mode, we have to detect whether this is_last_body_resp before we process the response message. Then passing the is_last_body_resp to the stream_closing logic.

last_response = true;
}
break;
case ProcessingResponse::ResponseCase::kRequestBody:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this didn't consider request trailers: if EoS is set by decodeTrailers, last_response can be inferred by trailers mode and the queue state for streamed mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this in future PR. Some of the cases are listed here: #41425 (comment).

IMHO, in this PR, we just close the side stream for the cases we are 100% sure. For corner cases, they will be taken care of by the fall back mechanism which is filter onDestroy(). We don't have to be perfect in this PR. We can iterate and enhance. From another angle, it's probably bad to accidently close a stream we should not close. So, let's do this step-by-step.

break;
case ProcessingMode::STREAMED:
if (!state.chunkQueue().empty()) {
return state.chunkQueue().queue().front()->end_stream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do the evaluation after the response is "processed". then no need to check on state.chunkQueue().queue().front()->end_stream().
queue empty + state.completeBodyAvailable() works the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue empty + state.completeBodyAvailable() is ambiguous here. It may mean two different cases:

  1. The body chunk is the last body with EoS = true;
  2. or The body chunk is the last body with EoS = false, and trailer is received.

For case 1), it is equivalent to state.chunkQueue().queue().front()->end_stream(), which we can for sure close the stream. However, for case 2), it needs cross check the trailer processing mode to determine whether we can close the stream. We can do this in a follow up PR.

break;
case ProcessingResponse::ResponseCase::kResponseHeaders:
if (encoding_state_.hasNoBody() ||
(encoding_state_.bodyMode() == ProcessingMode::NONE && !encoding_state_.sendTrailers())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if body_replaced_ is true, there is no more message to send as well.

Copy link
Contributor Author

@yanjunxiang-google yanjunxiang-google Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic considered body_replaced_ already as CONTINUE_AND_REPLACE processing will also end up with

body_mode_ = ProcessingMode::NONE;
send_trailers_ = false;

So, the logic can capture this case.

break;
case ProcessingResponse::ResponseCase::kResponseBody:
if (is_last_body_resp) {
last_response = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not in this CL, comment on the case that's discussed on the request path

verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, ServerWaitforEnvoyHalfCloseThenCloseStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a parameterized test that use enumerate through the response types and possible "shortcut switches" (i.e. eos-in-mxn-mode, CONTINUE_AND_REPLACE in header response etc) and check whether an early half-close is sent?
I know this is a lot of cases, but with this set up, we will have confidence in the changes we are making in this PR and upcoming PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the changes in filter_test.cc and filter_full_duplex_test.cc. The config_->stats().streams_closed_ counters are specially checked to make sure we are now closing the streams ASAP in those cases.

@yanjunxiang-google
Copy link
Contributor Author

/retest

1 similar comment
@yanjunxiang-google
Copy link
Contributor Author

/retest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants